import com.atguigu.gmall.realtime.app.BaseSQLApp;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author lzc
 * @Date 2023/7/11 13:46
 */
public class DorisSql extends BaseSQLApp {
    public static void main(String[] args) {
        new DorisSql().start(
            30000,
            2,
            "DorisSql"
        );
    }
    
    @Override
    public void handle(StreamExecutionEnvironment env,
                       StreamTableEnvironment tEnv) {
        tEnv.executeSql("create table t1(" +
                            "  siteid int, " +
                            "  citycode smallint, " +
                            "  username string, " +
                            "  pv bigint " +
                            ")with(" +
                            " 'connector' = 'doris'," +
                            " 'fenodes' = 'hadoop162:7030'," +
                            "  'table.identifier' = 'test.table1'," +
                            "  'username' = 'root'," +
                            "  'password' = 'aaaaaa'" +
                            ")");
    
        Table table = tEnv.sqlQuery("select * from t1 where siteid=1");
    
        tEnv.executeSql("create table t2(" +
                            "  siteid int, " +
                            "  citycode smallint, " +
                            "  username string, " +
                            "  pv bigint " +
                            ")with(" +
                            " 'connector' = 'doris'," +
                            " 'fenodes' = 'hadoop162:7030'," +
                            "  'table.identifier' = 'test.table1'," +
                            "  'username' = 'root'," +
                            "  'password' = 'aaaaaa', " +
//                            "  'sink.properties.format' = 'json', " +
                            "  'sink.buffer-count' = '4', " +
                            "  'sink.buffer-size' = '4086'," +
                            "  'sink.enable-2pc' = 'false' " + // 测试阶段可以关闭两阶段提交,方便测试
//                            "  'sink.properties.read_json_by_line' = 'true' " +
                            ")");
    
    
        table.executeInsert("t2");
    
    
    }
}
